psycopg2数据库连接池使用

您所在的位置:网站首页 python oracle数据库连接池 psycopg2数据库连接池使用

psycopg2数据库连接池使用

2024-06-11 22:35| 来源: 网络整理| 查看: 265

使用pscopg2的连接池的时候遇到了一些报错,总有一些意外结果。网上能查到的信息很少而且充满错误。根据报错追踪源码,源码出乎意料的少(短小而精湛)。你以为看源码是麻烦的,有时候它是解决问题快的方式。

连接池介绍 psycopg2的连接池非常的简单,源码也没几行。官网介绍见https://www.psycopg.org/docs/pool.html。源码如下: import psycopg2 from psycopg2 import extensions as _ext class PoolError(psycopg2.Error): pass class AbstractConnectionPool(object): """Generic key-based pooling code.""" def __init__(self, minconn, maxconn, *args, **kwargs): """Initialize the connection pool. New 'minconn' connections are created immediately calling 'connfunc' with given parameters. The connection pool will support a maximum of about 'maxconn' connections. """ self.minconn = int(minconn) self.maxconn = int(maxconn) self.closed = False self._args = args self._kwargs = kwargs self._pool = [] self._used = {} self._rused = {} # id(conn) -> key map self._keys = 0 for i in range(self.minconn): self._connect() def _connect(self, key=None): """Create a new connection and assign it to 'key' if not None.""" conn = psycopg2.connect(*self._args, **self._kwargs) if key is not None: self._used[key] = conn self._rused[id(conn)] = key else: self._pool.append(conn) return conn def _getkey(self): """Return a new unique key.""" self._keys += 1 return self._keys def _getconn(self, key=None): """Get a free connection and assign it to 'key' if not None.""" if self.closed: raise PoolError("connection pool is closed") if key is None: key = self._getkey() if key in self._used: return self._used[key] if self._pool: self._used[key] = conn = self._pool.pop() self._rused[id(conn)] = key return conn else: if len(self._used) == self.maxconn: raise PoolError("connection pool exhausted") return self._connect(key) def _putconn(self, conn, key=None, close=False): """Put away a connection.""" if self.closed: raise PoolError("connection pool is closed") if key is None: key = self._rused.get(id(conn)) if key is None: raise PoolError("trying to put unkeyed connection") if len(self._pool) < self.minconn and not close: # Return the connection into a consistent state before putting # it back into the pool if not conn.closed: status = conn.info.transaction_status if status == _ext.TRANSACTION_STATUS_UNKNOWN: # server connection lost conn.close() elif status != _ext.TRANSACTION_STATUS_IDLE: # connection in error or in transaction conn.rollback() self._pool.append(conn) else: # regular idle connection self._pool.append(conn) # If the connection is closed, we just discard it. else: conn.close() # here we check for the presence of key because it can happen that a # thread tries to put back a connection after a call to close if not self.closed or key in self._used: del self._used[key] del self._rused[id(conn)] def _closeall(self): """Close all connections. Note that this can lead to some code fail badly when trying to use an already closed connection. If you call .closeall() make sure your code can deal with it. """ if self.closed: raise PoolError("connection pool is closed") for conn in self._pool + list(self._used.values()): try: conn.close() except Exception: pass self.closed = True class SimpleConnectionPool(AbstractConnectionPool): """A connection pool that can't be shared across different threads.""" getconn = AbstractConnectionPool._getconn putconn = AbstractConnectionPool._putconn closeall = AbstractConnectionPool._closeall class ThreadedConnectionPool(AbstractConnectionPool): """A connection pool that works with the threading module.""" def __init__(self, minconn, maxconn, *args, **kwargs): """Initialize the threading lock.""" import threading AbstractConnectionPool.__init__( self, minconn, maxconn, *args, **kwargs) self._lock = threading.Lock() def getconn(self, key=None): """Get a free connection and assign it to 'key' if not None.""" self._lock.acquire() try: return self._getconn(key) finally: self._lock.release() def putconn(self, conn=None, key=None, close=False): """Put away an unused connection.""" self._lock.acquire() try: self._putconn(conn, key, close) finally: self._lock.release() def closeall(self): """Close all connections (even the one currently in use.)""" self._lock.acquire() try: self._closeall() finally: self._lock.release()

AbstractConnectionPool是一个基类,SimpleConnectionPool是基类的简单实现,ThreadedConnectionPool是线程安全的实现。从上面图中也能看出 ThreadedConnectionPool 类有_lock是 threading.Lock() 实例,它控制同一时间只允许一个线程操作连接池来get、put、close连接操作。

报错信息解释 "connection pool exhausted"

说明数据库连接池已经满了,达到了设置的最大连接数。ThreadedConnectionPool类当连接数超出时并不会去等待空闲的连接,而是直接报错了。在多线程连接的时候会经常遇到这个报错。

"trying to put unkeyed connection"

通过阅读源码,能够看出报该错的条件。getconn方法得到连接的同时会把连接记录到 _used 和 _rused 中。putconn方法不是把连接close了,而是把连接放回了池中(可以先简单的这么认为)。如果调用putconn方法时在 _rused 中没有找到记录,说明这个连接并没有“正在使用中”,这个连接可能已经close了也可能已经放回池中了(放回 _pool 队列中了)。试图关闭一个没有在使用中的连接就报错了。

在多线程连接中经常遇到这个报错,比如这种场景:有两个线程在池中同时获得了同一个连接(这是有可能的,通过getconn(key)指定相同的key会返回同一个连接),而其中一个线程把连接putconn到池中或close了,然后另一个线程调用putconn方法时在_rused中就找不到记录了,因为这个连接已经在池中或close了,然后就会报错。

连接池代码样例 from contextlib import contextmanager from threading import Semaphore import psycopg2 from psycopg2 import pool, extensions pgsql_config = { 'user': '***', 'password': '***', 'host': '***', 'port': '***', 'database': '***' } class ReallyThreadedConnectionPool(psycopg2.pool.ThreadedConnectionPool): def __init__(self, minconn, maxconn, *args, **kwargs): self._semaphore = Semaphore(maxconn) super().__init__(minconn, maxconn, *args, **kwargs) def getconn(self, key=None): self._semaphore.acquire() return super().getconn(key) def putconn(self, *args, **kwargs): super().putconn(*args, **kwargs) self._semaphore.release() cnxpool = ReallyThreadedConnectionPool(5, 10, **pgsql_config) @contextmanager def get_cursor(): try: con = cnxpool.getconn() cursor = con.cursor() yield cursor except psycopg2.Error as e: print(e) finally: cursor.close() cnxpool.putconn(con) class PyPgsql(object): @staticmethod def get_all(sql): with get_cursor() as cursor: cursor.execute(sql) return cursor.fetchall() if __name__ == '__main__': import time from concurrent.futures import ThreadPoolExecutor def t(n): r = PyPgsql.get_all("select * from TABLE") print(r) s = time.time() with ThreadPoolExecutor(max_workers=15) as pool: for i in range(20): pool.submit(t, (i)) e = time.time() print(e - s)

思路方式就是:

利用信号量来控制多线程获取的连接数,当一个线程获取连接后,信号量减一。当一个线程释放连接时,信号量加一。当信号量为0时,线程就会等待。



【本文地址】


今日新闻


推荐新闻


    CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3